package connection

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	"github.com/spf13/viper"
	"os"
	"os/signal"
	"sync"
	"time"
)

// 暂时未使用到
type consumerGroupHandler struct {
	name string
}

func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

var idx = 0
var now = time.Now()

func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		idx++
		if idx%1000 == 0 {
			fmt.Println("接收订阅【耗时秒", (time.Now()).Unix()-now.Unix())
			fmt.Println("接收订阅【耗时毫秒", ((time.Now()).UnixNano()/1e6)-(now.UnixNano()/1e6))
			fmt.Println("接收订阅【", string(msg.Key), "】—————", idx, "—————接收订阅完毕—————————!", idx)
			fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
		}
		// 手动确认消息
		sess.MarkMessage(msg, "")
		//sess.Commit()
	}
	return nil
}

func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
	wg.Done()
	for err := range (*group).Errors() {
		fmt.Println("ERROR", err)
	}
}

func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, groupId string) {
	fmt.Println(groupId + "start")
	wg.Done()
	ctx := context.Background()
	for {
		topics := []string{"my_topic"}
		handler := consumerGroupHandler{name: groupId}
		err := (*group).Consume(ctx, topics, handler)
		if err != nil {
			panic(err)
		}
	}
}

func KafkaConsumerGroup() {
	var wg sync.WaitGroup
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = false
	config.Consumer.Offsets.AutoCommit.Enable = false
	//config.Consumer.Offsets.Initial = sarama.OffsetNewest

	config.Version = sarama.V3_3_2_0
	client, err := sarama.NewClient(viper.GetStringSlice("receive.kafka.brokerIpPort"), config)
	//client, err := sarama.NewClient([]string{"192.168.0.104:9092", "localhost:9292", "localhost:9392"}, config)
	if err != nil {
		panic(err)
	}
	defer client.Close()
	//var list [10]sarama.ConsumerGroup
	wg.Add(1)
	//for i := 0; i < 10; i++ {
	//	var groupId = "c" + strconv.Itoa(i)
	//	group, err := sarama.NewConsumerGroupFromClient(groupId, client)
	//	if err != nil {
	//		panic(err)
	//	}
	//	list[i] = group
	//	defer (list[i]).Close()
	//	go consume(&(list[i]), &wg, groupId)
	//}
	group1, err := sarama.NewConsumerGroupFromClient("c5", client)
	now = time.Now()
	fmt.Println("接收订阅链接成功", (time.Now()).Unix()-now.Unix())
	//group2, err := sarama.NewConsumerGroupFromClient("c2", client)
	//if err != nil {
	//	panic(err)
	//}
	//group3, err := sarama.NewConsumerGroupFromClient("c3", client)
	//if err != nil {
	//	panic(err)
	//}
	defer group1.Close()
	//defer group2.Close()
	//defer group3.Close()
	//wg.Add(3)
	go consume(&group1, &wg, "c5")
	//go consume(&group2, &wg, "c2")
	//go consume(&group3, &wg, "c3")
	wg.Wait()
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	select {
	case <-signals:
		fmt.Println(" <-signals:")
	}
}
